package org.ghost.springboot.demo.util.http;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Consts;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.auth.*;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.ghost.springboot.demo.dto.Tuple2;
import org.ghost.springboot.demo.util.ExceptionUtil;
import org.ghost.springboot.demo.util.JacksonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.util.StreamUtils;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * http://hc.apache.org/httpcomponents-asyncclient-dev/examples.html
 */
public final class HttpAsyncClientHelper {
    private static final Logger logger = LoggerFactory.getLogger(HttpAsyncClientHelper.class);
    /**
     * 设置等待数据超时时间5秒钟(接口太慢可以考虑增加超时时间)
     */
    private static int SOCKET_TIMEOUT = 5000;
    /**
     * 连接超时
     */
    private static int CONNECT_TIMEOUT = 5000;
    /**
     * 连接请求超时
     */
    private static int CONNECT_REQUEST_TIMEOUT = 2000;
    /**
     * 连接池最大连接数
     */
    private static int POOL_SIZE = 3000;
    /**
     * 每个主机的并发最多只有1500
     */
    private static int MAX_PER_ROUTE = 1500;

    public static RequestConfig getRequestConfig() {
        return RequestConfig.custom()
                .setConnectTimeout(CONNECT_TIMEOUT)
                .setSocketTimeout(SOCKET_TIMEOUT)
                .setConnectionRequestTimeout(CONNECT_REQUEST_TIMEOUT)
                .build();
    }

    public static PoolingNHttpClientConnectionManager getPoolingNHttpClientConnectionManager() throws IOReactorException {
        SSLContext sslContext = SSLContexts.createDefault();
        /**
         * 设置协议http和https对应的处理socket链接工厂的对象
         */
        Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder
                .<SchemeIOSessionStrategy>create()
                .register("http", NoopIOSessionStrategy.INSTANCE)
                .register("https", new SSLIOSessionStrategy(sslContext))
                .build();
        /**
         * 配置IO线程
         */
        IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
                .setIoThreadCount(Runtime.getRuntime().availableProcessors())
                .build();
        /**
         * 设置连接池大小
         */
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
        PoolingNHttpClientConnectionManager conMgr = new PoolingNHttpClientConnectionManager(ioReactor, null, sessionStrategyRegistry, null);
        if (POOL_SIZE > 0) {
            conMgr.setMaxTotal(POOL_SIZE);
        }
        if (MAX_PER_ROUTE > 0) {
            conMgr.setDefaultMaxPerRoute(MAX_PER_ROUTE);
        } else {
            conMgr.setDefaultMaxPerRoute(10);
        }

        ConnectionConfig connectionConfig = ConnectionConfig.custom()
                .setMalformedInputAction(CodingErrorAction.IGNORE)
                .setUnmappableInputAction(CodingErrorAction.IGNORE)
                .setCharset(Consts.UTF_8).build();

        conMgr.setDefaultConnectionConfig(connectionConfig);

        return conMgr;
    }

    public static CloseableHttpAsyncClient createAsyncClient() throws IOReactorException {
        PoolingNHttpClientConnectionManager conMgr = getPoolingNHttpClientConnectionManager();

        Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder
                .<AuthSchemeProvider>create()
                .register(AuthSchemes.BASIC, new BasicSchemeFactory())
                .register(AuthSchemes.DIGEST, new DigestSchemeFactory())
                .register(AuthSchemes.NTLM, new NTLMSchemeFactory())
                .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
                .register(AuthSchemes.KERBEROS, new KerberosSchemeFactory())
                .build();

        return HttpAsyncClients.custom().setConnectionManager(conMgr)
                .setDefaultAuthSchemeRegistry(authSchemeRegistry)
                .setDefaultCookieStore(new BasicCookieStore())
                .setDefaultRequestConfig(getRequestConfig()).build();
    }

    public static CloseableHttpAsyncClient createAsyncClient(String userName, String password) throws IOReactorException {
        UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(userName, password);
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, credentials);

        PoolingNHttpClientConnectionManager conMgr = getPoolingNHttpClientConnectionManager();

        Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder
                .<AuthSchemeProvider>create()
                .register(AuthSchemes.BASIC, new BasicSchemeFactory())
                .register(AuthSchemes.DIGEST, new DigestSchemeFactory())
                .register(AuthSchemes.NTLM, new NTLMSchemeFactory())
                .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
                .register(AuthSchemes.KERBEROS, new KerberosSchemeFactory())
                .build();

        return HttpAsyncClients.custom().setConnectionManager(conMgr)
                .setDefaultCredentialsProvider(credentialsProvider)
                .setDefaultAuthSchemeRegistry(authSchemeRegistry)
                .setDefaultCookieStore(new BasicCookieStore())
                .setDefaultRequestConfig(getRequestConfig()).build();
    }

    public static CloseableHttpAsyncClient createAsyncClientProxy(String host, int port, String userName, String password) throws IOReactorException {
        UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(userName, password);
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, credentials);

        PoolingNHttpClientConnectionManager conMgr = getPoolingNHttpClientConnectionManager();

        Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder
                .<AuthSchemeProvider>create()
                .register(AuthSchemes.BASIC, new BasicSchemeFactory())
                .register(AuthSchemes.DIGEST, new DigestSchemeFactory())
                .register(AuthSchemes.NTLM, new NTLMSchemeFactory())
                .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
                .register(AuthSchemes.KERBEROS, new KerberosSchemeFactory())
                .build();

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(CONNECT_TIMEOUT)
                .setSocketTimeout(SOCKET_TIMEOUT)
                .setConnectionRequestTimeout(CONNECT_REQUEST_TIMEOUT)
                .build();

        return HttpAsyncClients.custom().setConnectionManager(conMgr)
                .setDefaultCredentialsProvider(credentialsProvider)
                .setDefaultAuthSchemeRegistry(authSchemeRegistry)
                .setProxy(new HttpHost(host, port))
                .setDefaultCookieStore(new BasicCookieStore())
                .setDefaultRequestConfig(getRequestConfig()).build();

    }

    /*返回结果-begin******************************************************************************************************************/
    public static <T> T invoke(HttpRequestBuilder builder, Class<T> cls) {
        long begin = System.currentTimeMillis();
        Tuple2<Integer, String> tupleRsp = new Tuple2<Integer, String>(HttpStatus.OK.value(), "");
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            Future<HttpResponse> future = httpClient.execute(builder.getHttpRequestBase(), null);
            HttpResponse response = future.get();

            tupleRsp = getResponse(response);
            if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                return JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), cls);
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
            writeLog(builder.getHttpRequestBase(), begin, tupleRsp);
        }

        return null;
    }

    public static <T> T invoke(HttpRequestBuilder builder, TypeReference<T> typeReference) {
        long begin = System.currentTimeMillis();
        Tuple2<Integer, String> tupleRsp = new Tuple2<Integer, String>(HttpStatus.OK.value(), "");
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            Future<HttpResponse> future = httpClient.execute(builder.getHttpRequestBase(), null);
            HttpResponse response = future.get();

            tupleRsp = getResponse(response);
            if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                return JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference);
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
            writeLog(builder.getHttpRequestBase(), begin, tupleRsp);
        }

        return null;
    }

    public static <T> T invoke(HttpRequestBuilder builder, ParameterizedType parameterizedType) {
        long begin = System.currentTimeMillis();
        TypeReference<T> typeReference = new TypeReference<T>() {
            @Override
            public Type getType() {
                return parameterizedType;
            }
        };
        Tuple2<Integer, String> tupleRsp = new Tuple2<Integer, String>(HttpStatus.OK.value(), "");
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            Future<HttpResponse> future = httpClient.execute(builder.getHttpRequestBase(), null);
            HttpResponse response = future.get();

            tupleRsp = getResponse(response);
            if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                return JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference);
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
            writeLog(builder.getHttpRequestBase(), begin, tupleRsp);
        }

        return null;
    }

    public static <T> T invoke(HttpRequestBuilder builder, Class rawType, Type ownerType, Type... argType) {
        long begin = System.currentTimeMillis();
        ParameterizedType parameterizedType = ParameterizedTypeImpl.make(rawType, argType, ownerType);
        TypeReference<T> typeReference = new TypeReference<T>() {
            @Override
            public Type getType() {
                return parameterizedType;
            }
        };
        Tuple2<Integer, String> tupleRsp = new Tuple2<Integer, String>(HttpStatus.OK.value(), "");
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            Future<HttpResponse> future = httpClient.execute(builder.getHttpRequestBase(), null);
            HttpResponse response = future.get();

            tupleRsp = getResponse(response);
            if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                return JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference);
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
            writeLog(builder.getHttpRequestBase(), begin, tupleRsp);
        }

        return null;
    }
    /*返回结果-end******************************************************************************************************************/

    /*单条回调-begin******************************************************************************************************************/
    public static <T> void invoke(HttpRequestBuilder requestBuilder, Class<T> cls, FutureCallback<T> callback) {
        long begin = System.currentTimeMillis();
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                @Override
                public void completed(final HttpResponse response) {
                    if (callback != null) {
                        Tuple2<Integer, String> tupleRsp = getResponse(response);
                        if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                            callback.completed(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), cls));
                        }
                        writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                    }
                }

                @Override
                public void failed(final Exception ex) {
                    if (callback != null) {
                        callback.failed(ex);
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                }

                @Override
                public void cancelled() {
                    if (callback != null) {
                        callback.cancelled();
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                }
            });
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
        }
    }

    public static <T> void invoke(HttpRequestBuilder requestBuilder, TypeReference<T> typeReference, FutureCallback<T> callback) {
        long begin = System.currentTimeMillis();
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                @Override
                public void completed(final HttpResponse response) {
                    if (callback != null) {
                        Tuple2<Integer, String> tupleRsp = getResponse(response);
                        if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                            callback.completed(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference));
                        }
                        writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                    }
                }

                @Override
                public void failed(final Exception ex) {
                    if (callback != null) {
                        callback.failed(ex);
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                }

                @Override
                public void cancelled() {
                    if (callback != null) {
                        callback.cancelled();
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                }
            });
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
        }
    }

    public static <T> void invoke(HttpRequestBuilder requestBuilder, Class rawType, Type ownerType, Type[] argType, FutureCallback<T> callback) {
        long begin = System.currentTimeMillis();
        ParameterizedType parameterizedType = ParameterizedTypeImpl.make(rawType, argType, ownerType);
        TypeReference<T> typeReference = new TypeReference<T>() {
            @Override
            public Type getType() {
                return parameterizedType;
            }
        };
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                @Override
                public void completed(final HttpResponse response) {
                    if (callback != null) {
                        Tuple2<Integer, String> tupleRsp = getResponse(response);
                        if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                            callback.completed(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference));
                        }
                        writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                    }
                }

                @Override
                public void failed(final Exception ex) {
                    if (callback != null) {
                        callback.failed(ex);
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                }

                @Override
                public void cancelled() {
                    if (callback != null) {
                        callback.cancelled();
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                }
            });
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
        }
    }

    public static <T> void invoke(HttpRequestBuilder requestBuilder, ParameterizedType parameterizedType, FutureCallback<T> callback) {
        long begin = System.currentTimeMillis();
        TypeReference<T> typeReference = new TypeReference<T>() {
            @Override
            public Type getType() {
                return parameterizedType;
            }
        };
        CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        try {
            httpClient.start();
            httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                @Override
                public void completed(final HttpResponse response) {
                    if (callback != null) {
                        Tuple2<Integer, String> tupleRsp = getResponse(response);
                        if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                            callback.completed(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference));
                        }
                        writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                    }
                }

                @Override
                public void failed(final Exception ex) {
                    if (callback != null) {
                        callback.failed(ex);
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                }

                @Override
                public void cancelled() {
                    if (callback != null) {
                        callback.cancelled();
                    }
                    writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                }
            });
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
        }
    }
    /*单条回调-end******************************************************************************************************************/

    /*批量回调-begin******************************************************************************************************************/
    public static <T> void invoke(List<HttpRequestBuilder> builderList, Class<T> cls, HttpResultCallback<T> callback) {
        if (CollectionUtils.isNotEmpty(builderList)) {
            final long begin = System.currentTimeMillis();
            final CountDownLatch latch = new CountDownLatch(builderList.size());
            CloseableHttpAsyncClient httpClient = null;
            try {
                httpClient = createAsyncClient();
                httpClient.start();
                for (final HttpRequestBuilder requestBuilder : builderList) {
                    httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                        @Override
                        public void completed(final HttpResponse response) {
                            latch.countDown();
                            if (callback != null) {
                                Tuple2<Integer, String> tupleRsp = getResponse(response);
                                if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                                    callback.handle(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), cls));
                                }
                                writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                            }
                        }

                        @Override
                        public void failed(final Exception ex) {
                            latch.countDown();
                            writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                        }

                        @Override
                        public void cancelled() {
                            latch.countDown();
                            writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                        }
                    });
                }
                latch.await();
            } catch (InterruptedException | IOReactorException e) {
                logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
            } finally {
                try {
                    if (httpClient != null) {
                        httpClient.close();
                    }
                } catch (IOException e) {
                    logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
                }
            }
        }
    }

    public static <T> void invoke(List<HttpRequestBuilder> builderList, TypeReference<T> typeReference, HttpResultCallback<T> callback) {
        if (CollectionUtils.isNotEmpty(builderList)) {
            final long begin = System.currentTimeMillis();
            final CountDownLatch latch = new CountDownLatch(builderList.size());
            CloseableHttpAsyncClient httpClient = null;
            try {
                httpClient = createAsyncClient();
                httpClient.start();
                for (final HttpRequestBuilder requestBuilder : builderList) {
                    httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                        @Override
                        public void completed(final HttpResponse response) {
                            latch.countDown();
                            if (callback != null) {
                                Tuple2<Integer, String> tupleRsp = getResponse(response);
                                if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                                    callback.handle(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference));
                                }
                                writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                            }
                        }

                        @Override
                        public void failed(final Exception ex) {
                            latch.countDown();
                            writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                        }

                        @Override
                        public void cancelled() {
                            latch.countDown();
                            writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                        }
                    });
                }
                latch.await();
            } catch (InterruptedException | IOReactorException e) {
                logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
            } finally {
                try {
                    if (httpClient != null) {
                        httpClient.close();
                    }
                } catch (IOException e) {
                    logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
                }
            }
        }
    }

    public static <T> void invoke(List<HttpRequestBuilder> builderList, Class rawType, Type ownerType, Type[] argType, HttpResultCallback<T> callback) {
        if (CollectionUtils.isNotEmpty(builderList)) {
            final long begin = System.currentTimeMillis();
            final ParameterizedType parameterizedType = ParameterizedTypeImpl.make(rawType, argType, ownerType);
            final TypeReference<T> typeReference = new TypeReference<T>() {
                @Override
                public Type getType() {
                    return parameterizedType;
                }
            };
            final CountDownLatch latch = new CountDownLatch(builderList.size());
            CloseableHttpAsyncClient httpClient = null;
            try {
                httpClient = createAsyncClient();
                httpClient.start();
                for (final HttpRequestBuilder requestBuilder : builderList) {
                    httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                        @Override
                        public void completed(final HttpResponse response) {
                            latch.countDown();
                            if (callback != null) {
                                Tuple2<Integer, String> tupleRsp = getResponse(response);
                                if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                                    callback.handle(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference));
                                }
                                writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                            }

                        }

                        @Override
                        public void failed(final Exception ex) {
                            latch.countDown();
                            writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                        }

                        @Override
                        public void cancelled() {
                            latch.countDown();
                            writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                        }
                    });
                }
                latch.await();
            } catch (InterruptedException | IOReactorException e) {
                logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
            } finally {
                try {
                    if (httpClient != null) {
                        httpClient.close();
                    }
                } catch (IOException e) {
                    logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
                }
            }
        }
    }

    public static <T> void invoke(List<HttpRequestBuilder> builderList, ParameterizedType parameterizedType, HttpResultCallback<T> callback) {
        long begin = System.currentTimeMillis();
        TypeReference<T> typeReference = new TypeReference<T>() {
            @Override
            public Type getType() {
                return parameterizedType;
            }
        };
        final CountDownLatch latch = new CountDownLatch(builderList.size());
        CloseableHttpAsyncClient httpClient = null;
        try {
            httpClient = createAsyncClient();
            httpClient.start();
            for (final HttpRequestBuilder requestBuilder : builderList) {
                httpClient.execute(requestBuilder.getHttpRequestBase(), new FutureCallback<HttpResponse>() {
                    @Override
                    public void completed(final HttpResponse response) {
                        latch.countDown();
                        if (callback != null) {
                            Tuple2<Integer, String> tupleRsp = getResponse(response);
                            if (StringUtils.isNotBlank(tupleRsp.getSecond())) {
                                callback.handle(JacksonUtil.nonEmptyMapper().fromJson(tupleRsp.getSecond(), typeReference));
                            }
                            writeLog(requestBuilder.getHttpRequestBase(), begin, tupleRsp);
                        }
                    }

                    @Override
                    public void failed(final Exception ex) {
                        latch.countDown();
                        writeLog(requestBuilder.getHttpRequestBase(), begin, ex.getMessage() + ExceptionUtil.getExceptionStack(ex));
                    }

                    @Override
                    public void cancelled() {
                        latch.countDown();
                        writeLog(requestBuilder.getHttpRequestBase(), begin, "cancelled");
                    }
                });
            }
            latch.await();
        } catch (InterruptedException | IOReactorException e) {
            logger.error("*****HttpAsyncClientHelper.invoke出现错误:" + e.getMessage(), e);
        } finally {
            try {
                if (httpClient != null) {
                    httpClient.close();
                }
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.invoke-httpClient.close出现错误:" + e.getMessage(), e);
            }
        }
    }
    /*批量回调-end******************************************************************************************************************/

    /*结果解析-begin******************************************************************************************************************/
    public static <T> T getResponse(HttpResponse response, Class<T> cls) {
        Tuple2<Integer, String> tuple2 = getResponse(response);
        if (StringUtils.isNotBlank(tuple2.getSecond())) {
            return JacksonUtil.nonEmptyMapper().fromJson(tuple2.getSecond(), cls);
        }

        return null;
    }

    public static <T> T getResponse(HttpResponse response, TypeReference<T> typeReference) {
        Tuple2<Integer, String> tuple2 = getResponse(response);
        if (StringUtils.isNotBlank(tuple2.getSecond())) {
            return JacksonUtil.nonEmptyMapper().fromJson(tuple2.getSecond(), typeReference);
        }

        return null;
    }

    public static <T> T getResponse(HttpResponse response, ParameterizedType parameterizedType) {
        Tuple2<Integer, String> tuple2 = getResponse(response);
        if (StringUtils.isNotBlank(tuple2.getSecond())) {
            TypeReference<T> typeReference = new TypeReference<T>() {
                @Override
                public Type getType() {
                    return parameterizedType;
                }
            };
            return JacksonUtil.nonEmptyMapper().fromJson(tuple2.getSecond(), typeReference);
        }

        return null;
    }

    public static <T> T getResponse(HttpResponse response, Class rawType, Type ownerType, Type... argType) {
        Tuple2<Integer, String> tuple2 = getResponse(response);
        if (StringUtils.isNotBlank(tuple2.getSecond())) {
            ParameterizedType parameterizedType = ParameterizedTypeImpl.make(rawType, argType, ownerType);
            TypeReference<T> typeReference = new TypeReference<T>() {
                @Override
                public Type getType() {
                    return parameterizedType;
                }
            };
            return JacksonUtil.nonEmptyMapper().fromJson(tuple2.getSecond(), typeReference);
        }

        return null;
    }

    private static Tuple2<Integer, String> getResponse(HttpResponse response) {
        int status = HttpStatus.INTERNAL_SERVER_ERROR.value();
        if (response != null && response.getStatusLine() != null) {
            status = response.getStatusLine().getStatusCode();
            //if (HttpStatus.OK.value() == status) {
            //TODO 有些接口返回204,206，这里只要返回2XX都去获取一遍返回结果
            if (status >= HttpStatus.OK.value() && status < HttpStatus.MULTIPLE_CHOICES.value()) {
                try {
                    return new Tuple2<Integer, String>(status, EntityUtils.toString(response.getEntity()));
                } catch (IOException e) {
                    logger.error("*****HttpAsyncClientHelper.getResponse出现错误:", e);
                }
            }
        }

        return new Tuple2<Integer, String>(status, null);
    }

    /*结果解析-end******************************************************************************************************************/

    private static void writeLog(HttpRequestBase httpRequestBase, Long begin, String message) {
        long end = System.currentTimeMillis();
        if (httpRequestBase instanceof HttpEntityEnclosingRequestBase) {
            HttpEntityEnclosingRequestBase temp = (HttpEntityEnclosingRequestBase) httpRequestBase;
            try {
                String requestBody = "";
                if (temp.getEntity() != null && temp.getEntity().getContent() != null) {
                    requestBody = StreamUtils.copyToString(temp.getEntity().getContent(), Charset.forName("UTF-8"));
                }
                logger.info("*****method:{},url:{},time:{},request:{},message:{}", httpRequestBase.getMethod(), httpRequestBase.getURI().toString(), (end - begin) / 1000.0, requestBody, message);
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.writeLog出现错误,错误信息: {}, {}", e.getMessage(), e);
            }
        } else {
            logger.info("*****method:{},url:{},time:{},status:{},message:{}", httpRequestBase.getMethod(), httpRequestBase.getURI().toString(), (end - begin) / 1000.0, message);
        }
    }

    private static void writeLog(HttpRequestBase httpRequestBase, Long begin, Tuple2<Integer, String> tupleRsp) {
        long end = System.currentTimeMillis();
        if (httpRequestBase instanceof HttpEntityEnclosingRequestBase) {
            HttpEntityEnclosingRequestBase temp = (HttpEntityEnclosingRequestBase) httpRequestBase;
            try {
                String requestBody = "";
                if (temp.getEntity() != null && temp.getEntity().getContent() != null) {
                    requestBody = StreamUtils.copyToString(temp.getEntity().getContent(), Charset.forName("UTF-8"));
                }
                logger.info("*****method:{},url:{},time:{},status:{},request:{},response:{}", httpRequestBase.getMethod(), httpRequestBase.getURI().toString(), (end - begin) / 1000.0, tupleRsp.getFirst(), requestBody, tupleRsp.getSecond());
            } catch (IOException e) {
                logger.error("*****HttpAsyncClientHelper.writeLog出现错误,错误信息: {}, {}", e.getMessage(), e);
            }
        } else {
            logger.info("*****method:{},url:{},time:{},status:{},response:{}", httpRequestBase.getMethod(), httpRequestBase.getURI().toString(), (end - begin) / 1000.0, tupleRsp.getFirst(), tupleRsp.getSecond());
        }
    }

}

